---
title: "Elasticsearch sink"
sidebarTitle: "Elasticsearch sink"
description: "Stream Postgres changes directly to Elasticsearch with Sequin's Elasticsearch sink."
---

The **Elasticsearch sink** writes change data into an Elasticsearch index via the Elasticsearch REST API.

<Tip>
  This is the reference for the Elasticsearch sink. See the [quickstart](/quickstart/elasticsearch) for a step-by-step walkthrough or the [how-to guide](/how-to/stream-postgres-to-elasticsearch) for an explanation of how to use the Elasticsearch sink.
</Tip>

## Configuration

- **Endpoint URL**

    Base URL of your Elasticsearch cluster (for example, `https://your-es-server:9200`).

- **Index name**

    Name of the Elasticsearch index to write to. The index **must exist** before Sequin can stream data.

- **Authentication type**

    One of `api_key`, `basic`, or `bearer`.

- **Authentication value**

    Credential corresponding to the chosen authentication type.

- **Batch size** *(optional)*

    Maximum number of documents per bulk request (default `100`, maximum `10 000`).

### Authentication header formats

| Auth type | Example header |
|-----------|----------------|
| `api_key` | `Authorization: ApiKey <encoded-key>` |
| `basic`   | `Authorization: Basic <username:password>` |
| `bearer`  | `Authorization: Bearer <token>` |

Sequin constructs the header automatically—supply only the value shown in angle brackets.

## Transform requirements

Your [transform](/reference/transforms) must return a JSON document compatible with the target index's [mapping](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html).

Sequin sets the Elasticsearch document `_id` to the concatenation of the primary‑key column values of the Postgres row. [Let us know](https://github.com/sequinstream/sequin/issues/new) if you need to use a different `_id` field.

Because the sink always uses the `index` bulk operation, every payload must contain the **full** document. Partial updates are not supported.

### Example transforms

Index the full document:
```elixir
def transform(_action, record, _changes, _metadata) do
  record
end
```

Index certain fields, renaming from Postgres column names to Elasticsearch field names:
```elixir
def transform(_action, record, _changes, _metadata) do
  %{
    id: record["product_id"],
    name: record["product_name"],
    description: record["product_description"]
  }
end
```

## Bulk import behaviour

For each batch Sequin posts a request to the Elasticsearch [Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html):

`POST /{index_name}/_bulk`

| Change action | Bulk operation | Behaviour |
|---------------|---------------|-----------|
| `INSERT`, `UPDATE`, `READ` | `index` | Creates the document if missing; replaces it otherwise. |
| `DELETE` | `delete` | Removes the document by `_id`. |

All Bulk API payloads are newline‑delimited JSON (NDJSON) and end with a newline as required by Elasticsearch.

## API endpoints used by Sequin

- `POST /{index_name}/_bulk` – indexing, replacing, and deleting documents (always used, even for a single document).
- `POST /{index_name}/_search` – invoked by **Test Connection** in the console to verify connectivity.

No other Elasticsearch endpoints are called.

## Error handling

Typical errors surfaced in the Sequin console include:

- Connection or TLS failures.
- `404` – index not found.
- `401` / `403` – authentication or authorisation failure.
- Mapping conflicts (field type mismatches).
- Malformed bulk payload (usually due to transform output).

Errors from Elasticsearch are shown verbatim in the **Messages** tab. For bulk requests Sequin maps individual item failures to each corresponding document in the batch.

## Limits

- **Batch size** is limited to `10,000` documents by Sequin. Elasticsearch's default `http.max_content_length` (100 MB) may require smaller batches.
- One sink targets one index; multiple indices require multiple sinks.
- Only `index` and `delete` bulk operations are issued. Other operations (`update`, `create`, etc.) are not used.

## Routing

The Elasticsearch sink supports dynamic routing of the `index_name` with [routing functions](/reference/routing).

Example routing function:

```elixir
def route(action, record, changes, metadata) do
  %{
    index_name: metadata.table_name
  }
end
```

When not using a routing function, messages will be indexed into the statically configured index.

